{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Minimal example"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tuplex\n",
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "c = tuplex.Context(executorMemory='4G', executorCount=63, driverMemory='4G')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "c.parallelize([1, 2, 3, 4]).map(lambda x: x * x).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exception handling"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ds = c.parallelize([(1, 10), (2, 20), (100, 0), (6, 60)]).map(lambda a, b: a / b)\n",
    "\n",
    "\n",
    "ds.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ds.exception_counts"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ds.resolve(ZeroDivisionError, lambda x: 0).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### A more involved query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def extractPrice(x):\n",
    "    price = x['price']\n",
    "    p = 0\n",
    "    if x['offer'] == 'sold':\n",
    "        # price is to be calculated using price/sqft * sqft\n",
    "        val = x['facts and features']\n",
    "        s = val[val.find('Price/sqft:') + len('Price/sqft:') + 1:]\n",
    "        r = s[s.find('$')+1:s.find(', ') - 1]\n",
    "        price_per_sqft = int(r)\n",
    "        p = price_per_sqft * x['sqft']\n",
    "    elif x['offer'] == 'rent':\n",
    "        max_idx = price.rfind('/')\n",
    "        p = int(price[1:max_idx].replace(',', ''))\n",
    "    else:\n",
    "        # take price from price column\n",
    "        p = int(price[1:].replace(',', ''))\n",
    "\n",
    "    return p\n",
    "\n",
    "def extractType(x):\n",
    "    t = x['title'].lower()\n",
    "    type = 'unknown'\n",
    "    if 'condo' in t or 'apartment' in t:\n",
    "        type = 'condo'\n",
    "    if 'house' in t:\n",
    "        type = 'house'\n",
    "    return type\n",
    "\n",
    "def extractBd(x):\n",
    "    val = x['facts and features']\n",
    "    max_idx = val.find(' bd')\n",
    "    if max_idx < 0:\n",
    "        max_idx = len(val)\n",
    "    s = val[:max_idx]\n",
    "\n",
    "    # find comma before\n",
    "    split_idx = s.rfind(',')\n",
    "    if split_idx < 0:\n",
    "        split_idx = 0\n",
    "    else:\n",
    "        split_idx += 2\n",
    "    r = s[split_idx:]\n",
    "    return int(r)\n",
    "\n",
    "def extractSqft(x):\n",
    "    val = x['facts and features']\n",
    "    max_idx = val.find(' sqft')\n",
    "    if max_idx < 0:\n",
    "        max_idx = len(val)\n",
    "    s = val[:max_idx]\n",
    "\n",
    "    split_idx = s.rfind('ba ,')\n",
    "    if split_idx < 0:\n",
    "        split_idx = 0\n",
    "    else:\n",
    "        split_idx += 5\n",
    "    r = s[split_idx:]\n",
    "    r = r.replace(',', '')\n",
    "    return int(r)\n",
    "\n",
    "def extractOffer(x):\n",
    "    offer = x['title'].lower()\n",
    "    if 'sale' in offer:\n",
    "        return 'sale'\n",
    "    if 'rent' in offer:\n",
    "        return 'rent'\n",
    "    if 'sold' in offer:\n",
    "        return 'sold'\n",
    "    if 'foreclose' in offer.lower():\n",
    "        return 'foreclosed'\n",
    "    return offer\n",
    "\n",
    "\n",
    "tstart = time.time()\n",
    "ds = c.csv('/hot/data/zillow/large10GB.csv')\n",
    "\n",
    "output_path = '/hot/scratch/out.csv'\n",
    "\n",
    "ds.withColumn(\"bedrooms\", extractBd) \\\n",
    "    .filter(lambda x: x['bedrooms'] < 10) \\\n",
    "    .withColumn(\"type\", extractType) \\\n",
    "    .filter(lambda x: x['type'] == 'condo') \\\n",
    "    .withColumn(\"zipcode\", lambda x: '%05d' % int(x['postal_code'])) \\\n",
    "    .withColumn(\"sqft\", extractSqft) \\\n",
    "    .ignore(ValueError) \\\n",
    "    .withColumn(\"offer\", extractOffer) \\\n",
    "    .withColumn(\"price\", extractPrice) \\\n",
    "    .ignore(ValueError) \\\n",
    "    .selectColumns([\"url\", \"zipcode\", \n",
    "                    \"bedrooms\", \"type\", \"price\"]) \\\n",
    "    .tocsv(output_path)\n",
    "\n",
    "\n",
    "print('processing 10GB of input data took: {:.2f}s'.format(time.time() - tstart))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!head /hot/scratch/out.part0.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
